package day02

import java.util
import java.util.{Date, Random}
import java.util.concurrent.CountedCompleter

import com.alibaba.fastjson.{JSON, JSONObject}
import day01.MyAccumulator
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import sessionanalyze.constant.Constants
import sessionanalyze.dao.ISessionDetailDAO
import sessionanalyze.dao.factory.DAOFactory
import sessionanalyze.dao.impl.SessionAggrStatDAOImpl
import sessionanalyze.domain._
import sessionanalyze.test.MockData
import sessionanalyze.util._

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

/*
                    .::::.
                  .::::::::.
                 :::::::::::
             ..:::::::::::'	  FUCK YOU
           '::::::::::::'		Goddess bless, never BUG
             .::::::::::
        '::::::::::::::..
             ..::::::::::::.
           ``::::::::::::::::
            ::::``:::::::::'        .:::.
           ::::'   ':::::'       .::::::::.
         .::::'      ::::     .:::::::'::::.
        .:::'       :::::  .:::::::::' ':::::.
       .::'        :::::.:::::::::'      ':::::.
      .::'         ::::::::::::::'         ``::::.
  ...:::           ::::::::::::'              ``::.
 ```` ':.          ':::::::::'                  ::::..
                    '.:::::'                    ':'````..
                    
 ━━━━━━━━━━━━━━━━━━━━ 女神保佑,永无BUG ━━━━━━━━━━━━━━━━━━━━
*/
object UserActionAnalyze {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(Constants.SPARK_APP_NAME_SESSION).setMaster("local")
    val sc = new SparkContext(conf)
    val sparkSession = SparkSession.builder().appName(Constants.SPARK_APP_NAME_SESSION).getOrCreate()
    //生成模拟数据
    MockData.mock(sc,sparkSession)


    val taskDAO = DAOFactory.getTaskDAO
    //获取TaskId
    val taskId = ParamUtils.getTaskIdFromArgs(args,Constants.SPARK_LOCAL_TASKID_SESSION)
    //得到 Task 对象
    val task = taskDAO.findById(taskId)
    if(task == null){
      println("没有获取到对应的Task信息")
      return
    }
    //解析当前任务的参数信息
    val taskParam = JSON.parseObject(task.getTaskParam)
    //取某个时间范围内的数据
    val actionRDD:RDD[Row] = getActionRDDByDateRange(sparkSession,taskParam)

    //(session,action)
    val sessionIdActionRDD: RDD[(String, Row)] = actionRDD.map(row => (row(2).toString,row))
    //后面需要反复使用该RDD
    sessionIdActionRDD.cache()

    //通过 sessionId 对数据进行分组，取得一个完整的会话行为信息
    val sessionIdAggregateInfoRDD = aggregateByUserId(sparkSession,sessionIdActionRDD)

    //创建自定义广播变量对象
    val accumulator = new SessionAccumulator
    //进行注册
    sc.register(accumulator)

    //需求1：对用户所访问的session数据按条件进行分析
    val filterUserInfoRDD: RDD[(String, String)] = filterSessionUserInfoRDD(sessionIdAggregateInfoRDD,taskParam,accumulator)

    //filterUserInfo.foreach(println)
    //获取到过滤出来的数据 记录总数（并触发累加器）
    val count: Long = filterUserInfoRDD.count()
    println(count)

    //需求2：用户访问session进行聚合统计（求各阶段所占百分比）
    countedPercentage(count,accumulator.value,taskId)

    //将原始数据与过滤后的数据进行Join，最终形成过滤后的数据（sessionId,Row）格式
    val sessionDetailRDD = getSessionIdDetailRDD(sessionIdActionRDD,filterUserInfoRDD)

    //因为此数据需多次调用
    sessionDetailRDD.cache()

    //需求3：按时间比例随机抽取session（往两个表插数据）
//    extractSessionByRatio(sc,taskId,filterUserInfoRDD,sessionDetailRDD)

    //需求4：获取点击、下单、支付次数排名的top10
    val top10CategoryId: Array[Long] = getTop10Category(sessionDetailRDD,taskId)

    //需求5：对于排名前10的品类分别获取其点击次数前10的session
    getTop10CategorySession(taskId,top10CategoryId,sessionDetailRDD)


  }

  //需求5：对于排名前10的品类分别获取其点击次数前10的session
  def getTop10CategorySession(taskId: Long, top10CategoryId: Array[Long], sessionDetailRDD: RDD[(String, Row)]): Unit ={
    //通过工厂类获取 Top10SessionDAO对象
    val top10SessionDAO = DAOFactory.getTop10SessionDAO
    //通过工厂类获取 sessionDetailDAO对象
    val sessionDetailDAO = DAOFactory.getSessionDetailDAO
    //依次遍历Top10的每个品类点击Id
    for(id <- top10CategoryId){
      //取出该品类点击事件的Top10 session
      val top10: Array[(String, Long)] = sessionDetailRDD.filter(tup => {
        //过滤出该类型点击事件的所有 session
        val row = tup._2
        val categoryId: Long = if (row.get(6) != null) row.getLong(6) else 0L
        id == categoryId
      }).groupByKey().map(tup => (tup._1, tup._2.size.toLong)).sortBy(_._2, false).take(10)

      //用来存储 session的明细信息列表（批处理）
//      val sessionDetailList = new util.ArrayList[SessionDetail]()
      //tup的数据格式（sessionId,count） ，将每个品类的Top10数据插入到数据库中
      for(top <- top10){
        val top10Session = new Top10Session
        top10Session.setTaskid(taskId)
        top10Session.setCategoryid(id)
        top10Session.setSessionid(top._1)
        top10Session.setClickCount(top._2)
        top10SessionDAO.insert(top10Session)

//        sessionDetailRDD.filter(f => {
//          val id = f._1
//          id.equals(top._1)
//        }).map(tup => {
//          val row = tup._2
//          val sessionDetail = new SessionDetail
//          sessionDetail.setTaskid(taskId)
//          sessionDetail.setUserid(row.getLong(1))
//          sessionDetail.setSessionid(row.getString(2))
//          sessionDetail.setPageid(row.getLong(3))
//          sessionDetail.setActionTime(row.getString(4))
//          sessionDetail.setSearchKeyword(row.getString(5))
//          sessionDetail.setClickCategoryId(if(row.get(6) != null) row.getLong(6) else 0)
//          sessionDetail.setClickProductId(if(row.get(7) != null) row.getLong(7) else 0)
//          sessionDetail.setOrderCategoryIds(row.getString(8))
//          sessionDetail.setOrderProductIds(row.getString(9))
//          sessionDetail.setPayCategoryIds(row.getString(10))
//          sessionDetail.setPayProductIds(row.getString(11))
//          //将明细数据添加到List集合中
//          sessionDetailList.add(sessionDetail)
//        })

        sessionDetailRDD.foreachPartition(x => {
          //存放抽取的所有session记录
          val sessionDetails = new util.ArrayList[SessionDetail]
          x.foreach(tup => {
            val sessionId = tup._1
            if(sessionId.equals(top._1)) {
              val row: Row = tup._2
              val sessionDetail = new SessionDetail
              sessionDetail.setTaskid(taskId)
              sessionDetail.setUserid(row.getLong(1))
              sessionDetail.setSessionid(row.getString(2))
              sessionDetail.setPageid(row.getLong(3))
              sessionDetail.setActionTime(row.getString(4))
              sessionDetail.setSearchKeyword(row.getString(5))
              sessionDetail.setClickCategoryId(if (row.get(6) != null) row.getLong(6) else 0)
              sessionDetail.setClickProductId(if (row.get(7) != null) row.getLong(7) else 0)
              sessionDetail.setOrderCategoryIds(row.getString(8))
              sessionDetail.setOrderProductIds(row.getString(9))
              sessionDetail.setPayCategoryIds(row.getString(10))
              sessionDetail.setPayProductIds(row.getString(11))

              //将其添加到集合中，方便批处理
              sessionDetails.add(sessionDetail)
            }
          })
          //将其插入数据库，批处理
          sessionDetailDAO.insertBatch(sessionDetails)
          println("------> " + sessionDetails.size())
        })



      }
      //执行批处理插入数据（每次插入的是一个品类的Top10的session的明细数据）
//      sessionDetailDAO.insertBatch(sessionDetailList)
//      println("------> " + sessionDetailList.size())
    }
    println("数据插入top10_category_session表、session_detail表 完成 " )
  }

  //计算哪些品类被用户访问（点击、下单、支付）
  def getTop10Category(sessionDetailRDD: RDD[(String,Row)],taskId:Long)={
    //1、计算那些品类被访问过
    val categoryRDD: RDD[(Long, Long)] = getAllSessionVisitedCategory(sessionDetailRDD).distinct()

    //2、计算每个品类的点击次数、下单次数、支付次数
    val clickCategoryCountRDD: RDD[(Long, Long)] = getClickCategoryCountRDD(sessionDetailRDD)
    val orderCategoryCountRDD: RDD[(Long, Long)] = getOrderCategoryCountRDD(sessionDetailRDD)
    val payCategoryCountRDD: RDD[(Long, Long)] = getPayCategoryCountRDD(sessionDetailRDD)

    val joinedCategoryFullCountRDD: RDD[(Long, String)] = getJoinedCategoryFullCountRDD(categoryRDD,clickCategoryCountRDD,orderCategoryCountRDD,payCategoryCountRDD)

    //将其转换为自定义排序对象
    val top10CategoryArray: Array[(CategorySortKey, String)] = joinedCategoryFullCountRDD.map(tup => {
      val info = tup._2
      val clickCount = StringUtils.getFieldFromConcatString(info, "\\|", Constants.FIELD_CLICK_COUNT).toLong
      val orderCount = StringUtils.getFieldFromConcatString(info, "\\|", Constants.FIELD_ORDER_COUNT).toLong
      val payCount = StringUtils.getFieldFromConcatString(info, "\\|", Constants.FIELD_PAY_COUNT).toLong
      val sortObj = new CategorySortKey(clickCount, orderCount, payCount)
      (sortObj, info)
    }).sortByKey(false).take(10)

    val top10CategoryDAO = DAOFactory.getTop10CategoryDAO
    //把结果存入数据库
    for(item <- top10CategoryArray){
      val info = item._2
      val categoryId = StringUtils.getFieldFromConcatString(info,"\\|",Constants.FIELD_CATEGORY_ID).toLong
      val clickCount = StringUtils.getFieldFromConcatString(info, "\\|", Constants.FIELD_CLICK_COUNT).toLong
      val orderCount = StringUtils.getFieldFromConcatString(info, "\\|", Constants.FIELD_ORDER_COUNT).toLong
      val payCount = StringUtils.getFieldFromConcatString(info, "\\|", Constants.FIELD_PAY_COUNT).toLong

      val top10 = new Top10Category
      top10.setTaskid(taskId)
      top10.setCategoryid(categoryId)
      top10.setClickCount(clickCount)
      top10.setOrderCount(orderCount)
      top10.setPayCount(payCount)

      top10CategoryDAO.insert(top10)
    }
    println("插入数据表Top10Category成功 ")
    //返回一个Top10的类型点击ID
    top10CategoryArray.map(tup => StringUtils.getFieldFromConcatString(tup._2,"\\|",Constants.FIELD_CATEGORY_ID).toLong)
  }

  def getJoinedCategoryFullCountRDD(categoryRDD: RDD[(Long, Long)],clickCategoryCountRDD: RDD[(Long, Long)], orderCategoryCountRDD: RDD[(Long, Long)], payCategoryCountRDD: RDD[(Long, Long)]): RDD[(Long, String)] ={
    //需要的结果(categoryId,"categoryId=id|clikCount=value|orderCount=value|payCount=value")
    val res: RDD[(Long, String)] = categoryRDD.leftOuterJoin(clickCategoryCountRDD).map(tup => {
      val categoryId = tup._1
      val clickCount: Long = tup._2._2.getOrElse(0L)
      val str = Constants.FIELD_CATEGORY_ID + "=" + categoryId + "|" +
        Constants.FIELD_CLICK_COUNT + "=" + clickCount + "|"
      (categoryId, str)
    }).leftOuterJoin(orderCategoryCountRDD).map(tup => {
      val orderCount: Long = tup._2._2.getOrElse(0L)
      val str = tup._2._1 + Constants.FIELD_ORDER_COUNT + "=" + orderCount + "|"
      (tup._1, str)
    }).leftOuterJoin(payCategoryCountRDD).map(tup => {
      val payCount: Long = tup._2._2.getOrElse(0L)
      val str = tup._2._1 + Constants.FIELD_PAY_COUNT + "=" + payCount
      (tup._1, str)
    })
    res

  }

  def getPayCategoryCountRDD(sessionDetailRDD: RDD[(String, Row)]): RDD[(Long, Long)] ={
    //过滤，拿到所有下单行为的数据
    val payActionRDD: RDD[(String, Row)] = sessionDetailRDD.filter(tup => {
      val row = tup._2
      if (row.getString(10) != null)
        true
      else
        false
    })
    val res: RDD[(Long, Long)] = payActionRDD.flatMap(tup => {
      val payCategoryids = tup._2.getString(10)
      var list = List[(Long, Long)]()
      val splits = payCategoryids.split(",")
      for (s <- splits)
        list :+= (s.toLong, 1L)
      list
    }).reduceByKey(_+_)
    res
  }

  def getOrderCategoryCountRDD(sessionDetailRDD: RDD[(String, Row)]): RDD[(Long, Long)] ={
    //过滤，拿到所有下单行为的数据
    val orderActionRDD: RDD[(String, Row)] = sessionDetailRDD.filter(tup => {
      val row = tup._2
      if (row.getString(8) != null)
        true
      else
        false
    })
    val res: RDD[(Long, Long)] = orderActionRDD.flatMap(tup => {
      val orderCategoryids = tup._2.getString(8)
      var list = List[(Long, Long)]()
      val splits = orderCategoryids.split(",")
      for (s <- splits)
        list :+= (s.toLong, 1L)
      list
    }).reduceByKey(_+_)
    res
  }


  def getClickCategoryCountRDD(sessionDetailRDD: RDD[(String, Row)]): RDD[(Long, Long)] ={
    //过滤数据，拿到数据集所有点击行为的数据
    val clickActionRDD: RDD[(String, Row)] = sessionDetailRDD.filter(tup => {
      val row = tup._2
      if (row.get(6) != null)
        true
      else
        false
    })
    //进行聚合（categoryId，count）
    val res: RDD[(Long, Long)] = clickActionRDD.map(tup => (tup._2.getLong(6),1L)).reduceByKey(_+_)
    res
  }

  def getAllSessionVisitedCategory(sessionDetailRDD: RDD[(String,Row)]): RDD[(Long, Long)] ={
    val res: RDD[(Long, Long)] = sessionDetailRDD.flatMap(tup => {
      var list = List[(Long, Long)]()
      val row = tup._2
      //获取点击的品类
      if (!row.isNullAt(6)) {
        val clickCategoryId: Long = row.getLong(6)
        list = list.:+(clickCategoryId, clickCategoryId)
      }
      //获取下单的品类
      val orderCategoryids = row.getString(8)
      if (orderCategoryids != null) {
        for (s <- orderCategoryids.split(","))
          list :+= (s.toLong, s.toLong)
      }
      //获取支付的品类
      val payCategoryIds = row.getString(10)
      if (payCategoryIds != null) {
        for (s <- payCategoryIds.split(","))
          list :+= (s.toLong, s.toLong)
      }
      list
    })
    res
  }




  def extractSessionByRatio(sc:SparkContext,taskId:Long,filterUserInfoRDD: RDD[(String, String)], sessionDetailRDD: RDD[(String,Row)]): Unit ={
    //1、计算每小时的 session 个数
    //2、取出每个会话的发生时间，判断当前会话发生的日期和小时

    val timeSessionIdRDD: RDD[(String, String)] = filterUserInfoRDD.map(tup => {
      // 获取聚合数据
      val aggrinfo = tup._2
      val startTime = StringUtils.getFieldFromConcatString(aggrinfo, "\\|", Constants.FIELD_START_TIME)
      val dateHour = DateUtils.getDateHour(startTime)
      (dateHour, aggrinfo)
    })
    //统计出每天每小时的session数量
    val countMap: collection.Map[String, Long] = timeSessionIdRDD.countByKey()
    //将countMap数据的格式<date_hour,data>转换为：<yyyy-MM-dd,<HH,count>>形式
    val dateHourCountMap = new mutable.HashMap[String,mutable.HashMap[String,Long]]()
    //循环将其写入 dateHourCountMap
    for(key <- countMap.keys){
      val date = key.split("_")(0)
      val hour = key.split("_")(1)
      val count:Long = countMap.getOrElse(key,0L)
      //用来存储<hour,count>
      var hourCountMap = dateHourCountMap.getOrElse(date,null)
      if(hourCountMap == null){
        hourCountMap = new mutable.HashMap[String,Long]()
      }
      //按小时统计session个数
      hourCountMap.put(hour,count)
      //一天内每个小时的session个数
      dateHourCountMap.put(date,hourCountMap)
    }

    //实现按比例抽取方法（假设抽取100个，按天数平分）
    val extranctNumber = 100 / dateHourCountMap.size
    //存储每天每小时抽取的索引列表，Map[date,Map[hour,List[Int]]]
    val dateHourExtractMap = new mutable.HashMap[String,mutable.HashMap[String,ListBuffer[Int]]]()
    //创建一个随机对象
    val random = new Random()

    for(key <- dateHourCountMap.keys){
      //获取到日期对应的小时和 session数
      val hourCountMap: mutable.HashMap[String, Long] = dateHourCountMap.getOrElse(key,null)
      //统计当天的总 session数
      var sessionCount = 0L
      for(value <- hourCountMap.values)
        sessionCount += value
      //把一天的session数put到dateHourExtractMap
      var hourExtractMap: mutable.HashMap[String, ListBuffer[Int]] = dateHourExtractMap.getOrElse(key,null)
      if(hourExtractMap == null)
        hourExtractMap = new mutable.HashMap[String, ListBuffer[Int]]()

      //获取每个小时的session数量
      for(key <- hourCountMap.keys){
        //没小时对应的 session 数
        val count: Long = hourCountMap.getOrElse(key,0)
        // 计算每小时需要抽取的session数量
        var hourExtractNumber: Int = (count.toDouble / sessionCount * extranctNumber).toInt
        //当前session数可能小于每小时要抽取的session数
        if(hourExtractNumber > count)
          hourExtractNumber = count.toInt

        //获取当前小时存放随机数的list，若没有就创建一个
        var extractIndexList = hourExtractMap.getOrElse(key,null)
        if(extractIndexList == null)
          extractIndexList = new ListBuffer[Int]

        //生成每个小时所需要的随机索引
        var i = 0
        while(i < hourExtractNumber){
          var extractIndex = random.nextInt(count.toInt)
          //若已经存在，则重新生成随机索引
          while(extractIndexList.contains(extractIndex)) {
            extractIndex = random.nextInt(count.toInt)
          }
          extractIndexList += extractIndex
          //改变i的值
          i += 1
        }
        hourExtractMap.put(key,extractIndexList)
      }
      dateHourExtractMap.put(key,hourExtractMap)
    }

    //因为要对索引数据进行多次访问，将其广播以减少IO
    val dateHourExtractMapBroad = sc.broadcast(dateHourExtractMap)

    //获取到聚合数据，数据结构为(dateHour,Iterator[aggrInfo])
    val timeSessionRDD: RDD[(String, Iterable[String])] = timeSessionIdRDD.groupByKey()

    val extractSessionIdsRDD: RDD[(String, String)] = timeSessionRDD.flatMap(tup => {
      //用来存储 [sessionId,sessionId]
      val extractSessionIds = new ListBuffer[(String, String)]

      val dateHour = tup._1
      val date = dateHour.split("_")(0)
      val hour = dateHour.split("_")(1)
      //迭代器信息
      val it = tup._2.iterator
      //获取广播变量的值
      val dateHourExtractMap = dateHourExtractMapBroad.value
      //获取抽取出来的索引
      val extractIndexList: ListBuffer[Int] = dateHourExtractMap.getOrElse(date, null).getOrElse(hour, null)

      //通过工厂类构建对象
      val sessionRandomExtractDAO = DAOFactory.getSessionRandomExtractDAO

      var index = 0
      while (it.hasNext) {
        val sessionAggrInfo = it.next()
        //判断抽取的索引列表中是否存在该索引
        if (extractIndexList.contains(index)) {
          val sessionId = StringUtils.getFieldFromConcatString(sessionAggrInfo, "\\|", Constants.FIELD_SESSION_ID)

          val sessionRandomExtract = new SessionRandomExtract
          sessionRandomExtract.setTaskid(taskId)
          sessionRandomExtract.setSessionid(sessionId)
          sessionRandomExtract.setStartTime(StringUtils.getFieldFromConcatString(sessionAggrInfo, "\\|", Constants.FIELD_START_TIME))
          sessionRandomExtract.setSearchKeywords(StringUtils.getFieldFromConcatString(sessionAggrInfo, "\\|", Constants.FIELD_SEARCH_KEYWORDS))
          sessionRandomExtract.setClickCategoryIds(StringUtils.getFieldFromConcatString(sessionAggrInfo, "\\|", Constants.FIELD_CLICK_CATEGORY_IDS))
          //将其插入数据库
          sessionRandomExtractDAO.insert(sessionRandomExtract)
//          println("-------------------> " + index)
          //将 sessionId 加入List
          extractSessionIds += ((sessionId, sessionId))
        }
        index += 1
      }
      println("插入数据表sessionRandomExtract完成")
      extractSessionIds
    })

    //获取抽取出来的session对应的明细数据写入数据库session_detail
    //向将明细数据 join进来
    val extractSessionDetailRDD: RDD[(String, (String, Row))] = extractSessionIdsRDD.join(sessionDetailRDD)


    extractSessionDetailRDD.foreachPartition(x => {
      //存放抽取的所有session记录
      val sessionDetails = new util.ArrayList[SessionDetail]
      x.foreach(tup => {
        val row: Row = tup._2._2
        val sessionDetail = new SessionDetail
        sessionDetail.setTaskid(taskId)
        sessionDetail.setUserid(row.getLong(1))
        sessionDetail.setSessionid(row.getString(2))
        sessionDetail.setPageid(row.getLong(3))
        sessionDetail.setActionTime(row.getString(4))
        sessionDetail.setSearchKeyword(row.getString(5))
        sessionDetail.setClickCategoryId(if(row.get(6) != null) row.getLong(6) else 0)
        sessionDetail.setClickProductId(if(row.get(7) != null) row.getLong(7) else 0)
        sessionDetail.setOrderCategoryIds(row.getString(8))
        sessionDetail.setOrderProductIds(row.getString(9))
        sessionDetail.setPayCategoryIds(row.getString(10))
        sessionDetail.setPayProductIds(row.getString(11))

        //将其添加到集合中，方便批处理
        sessionDetails.add(sessionDetail)
      })
      //通过工具类构建对象
      val sessionDetailDAO = DAOFactory.getSessionDetailDAO
      //将其插入数据库，批处理
      sessionDetailDAO.insertBatch(sessionDetails)
//      println("------> " + sessionDetails.size())
    })
    println("插入数据表session_Detail完成")

  }

  //将数据进行转换，转换成（sessionId,Row）格式
  def getSessionIdDetailRDD(sessionIdActionRDD: RDD[(String, Row)], filterUserInfoRDD: RDD[(String, String)]): RDD[(String,Row)] ={
    //(sessionId,(aggreinfo,row))
    filterUserInfoRDD.join(sessionIdActionRDD).map(tup => (tup._1,tup._2._2))
  }



  def countedPercentage(count: Long, counted: String,taskId:Long) ={
    val time_1s_3s = NumberUtils.formatDouble(StringUtils.getFieldFromConcatString(counted,"\\|",Constants.TIME_PERIOD_1s_3s).toDouble/count,4)
    val time_4s_6s = NumberUtils.formatDouble(StringUtils.getFieldFromConcatString(counted,"\\|",Constants.TIME_PERIOD_4s_6s).toDouble/count,4)
    val time_7s_9s = NumberUtils.formatDouble(StringUtils.getFieldFromConcatString(counted,"\\|",Constants.TIME_PERIOD_7s_9s).toDouble/count,4)
    val time_10s_30s = NumberUtils.formatDouble(StringUtils.getFieldFromConcatString(counted,"\\|",Constants.TIME_PERIOD_10s_30s).toDouble/count,4)
    val time_30s_60s = NumberUtils.formatDouble(StringUtils.getFieldFromConcatString(counted,"\\|",Constants.TIME_PERIOD_30s_60s).toDouble/count,4)
    val time_1m_3m = NumberUtils.formatDouble(StringUtils.getFieldFromConcatString(counted,"\\|",Constants.TIME_PERIOD_1m_3m).toDouble/count,4)
    val time_3m_10m = NumberUtils.formatDouble(StringUtils.getFieldFromConcatString(counted,"\\|",Constants.TIME_PERIOD_3m_10m).toDouble/count,4)
    val time_10m_30m = NumberUtils.formatDouble(StringUtils.getFieldFromConcatString(counted,"\\|",Constants.TIME_PERIOD_10m_30m).toDouble/count,4)
    val time_30m = NumberUtils.formatDouble(StringUtils.getFieldFromConcatString(counted,"\\|",Constants.TIME_PERIOD_30m).toDouble/count,4)
    val setp_1_3 = NumberUtils.formatDouble(StringUtils.getFieldFromConcatString(counted,"\\|",Constants.STEP_PERIOD_1_3).toDouble/count,4)
    val setp_4_6 = NumberUtils.formatDouble(StringUtils.getFieldFromConcatString(counted,"\\|",Constants.STEP_PERIOD_4_6).toDouble/count,4)
    val setp_7_9 = NumberUtils.formatDouble(StringUtils.getFieldFromConcatString(counted,"\\|",Constants.STEP_PERIOD_7_9).toDouble/count,4)
    val setp_10_30 = NumberUtils.formatDouble(StringUtils.getFieldFromConcatString(counted,"\\|",Constants.STEP_PERIOD_10_30).toDouble/count,4)
    val setp_30_60 = NumberUtils.formatDouble(StringUtils.getFieldFromConcatString(counted,"\\|",Constants.STEP_PERIOD_30_60).toDouble/count,4)
    val setp_60 = NumberUtils.formatDouble(StringUtils.getFieldFromConcatString(counted,"\\|",Constants.STEP_PERIOD_60).toDouble/count,4)

    val sessionAggrStat = new SessionAggrStat

    sessionAggrStat.setVisit_length_1s_3s_ratio(time_1s_3s)
    sessionAggrStat.setVisit_length_4s_6s_ratio(time_4s_6s)
    sessionAggrStat.setVisit_length_7s_9s_ratio(time_7s_9s)
    sessionAggrStat.setVisit_length_10s_30s_ratio(time_10s_30s)
    sessionAggrStat.setVisit_length_30s_60s_ratio(time_30s_60s)
    sessionAggrStat.setVisit_length_1m_3m_ratio(time_1m_3m)
    sessionAggrStat.setVisit_length_3m_10m_ratio(time_3m_10m)
    sessionAggrStat.setVisit_length_10m_30m_ratio(time_10m_30m)
    sessionAggrStat.setVisit_length_30m_ratio(time_30m)

    sessionAggrStat.setStep_length_1_3_ratio(setp_1_3)
    sessionAggrStat.setStep_length_4_6_ratio(setp_4_6)
    sessionAggrStat.setStep_length_7_9_ratio(setp_7_9)
    sessionAggrStat.setStep_length_10_30_ratio(setp_10_30)
    sessionAggrStat.setStep_length_30_60_ratio(setp_30_60)
    sessionAggrStat.setStep_length_60_ratio(setp_60)

    sessionAggrStat.setTaskid(taskId)
    sessionAggrStat.setSession_count(count)

    val sessionAggrStatDAO = DAOFactory.getSessionAggrStatDAO
    sessionAggrStatDAO.insert(sessionAggrStat)
    println("插入数据表sessionAggrStat成功")
  }

  def filterSessionUserInfoRDD(value: RDD[(String, String)], taskParam: JSONObject,accumulator: SessionAccumulator): RDD[(String, String)] ={
    //解析参数
    val startAge = ParamUtils.getParam(taskParam,Constants.PARAM_START_AGE)
    val endAge = ParamUtils.getParam(taskParam,Constants.PARAM_END_AGE)
    val proffessional = ParamUtils.getParam(taskParam,Constants.PARAM_PROFESSIONALS)
    val city = ParamUtils.getParam(taskParam,Constants.PARAM_CITIES)
    val sex = ParamUtils.getParam(taskParam,Constants.PARAM_SEX)
    val keywords = ParamUtils.getParam(taskParam,Constants.PARAM_KEYWORDS)
    val categoryIds = ParamUtils.getParam(taskParam,Constants.PARAM_CATEGORY_IDS)

    //将条件进行拼接，最终形成一个字符串
    val buffer = new StringBuffer()
    if(StringUtils.isNotEmpty(startAge)) buffer.append(Constants.PARAM_START_AGE + "=" + startAge + "|")
    if(StringUtils.isNotEmpty(endAge)) buffer.append(Constants.PARAM_END_AGE + "=" + endAge + "|")
    if(StringUtils.isNotEmpty(proffessional)) buffer.append(Constants.PARAM_PROFESSIONALS + "=" + proffessional + "|")
    if(StringUtils.isNotEmpty(city)) buffer.append(Constants.PARAM_CITIES + "=" + city + "|")
    if(StringUtils.isNotEmpty(sex)) buffer.append(Constants.PARAM_SEX + "=" + sex + "|")
    if(StringUtils.isNotEmpty(keywords)) buffer.append(Constants.PARAM_KEYWORDS + "=" + keywords + "|")
    if(StringUtils.isNotEmpty(categoryIds)) buffer.append(Constants.PARAM_CATEGORY_IDS + "=" + categoryIds + "|")

    val param = buffer.toString
    //对结果进行过滤
    val res: RDD[(String, String)] = value.filter(x => filterFun(x,param,accumulator))
    res
  }

  def filterFun(tuple: (String, String), param: String,accumulator: SessionAccumulator):Boolean={
    val info = tuple._2
    //判断当前数据是否满足条件
    if(!ValidUtils.between(info,Constants.FIELD_AGE,param,Constants.PARAM_START_AGE,Constants.PARAM_END_AGE))
      return false
    if(!ValidUtils.in(info,Constants.FIELD_PROFESSIONAL,param,Constants.PARAM_PROFESSIONALS))
      return false
    if(!ValidUtils.equal(info,Constants.FIELD_CITY,param,Constants.PARAM_CITIES))
      return false
    if(!ValidUtils.equal(info,Constants.FIELD_SEX,param,Constants.PARAM_SEX))
      return false
    if(!ValidUtils.in(info,Constants.FIELD_SEARCH_KEYWORDS,param,Constants.PARAM_KEYWORDS))
      return false
    if(!ValidUtils.in(info,Constants.FIELD_CLICK_CATEGORY_IDS,param,Constants.PARAM_CATEGORY_IDS))
      return false

    //此处的数据是满足条件的数据（过滤后的数据），可以直接计算，节省资源
    //取出访问时长转换成Long类型，然后将其单位转换为秒
    val visitLength = StringUtils.getFieldFromConcatString(tuple._2,"\\|",Constants.FIELD_VISIT_LENGTH).toLong/1000
    //取出步长（访问也面数）
    val stepLength = StringUtils.getFieldFromConcatString(tuple._2,"\\|",Constants.FIELD_STEP_LENGTH).toLong
    if(visitLength >= 1 && visitLength <=3) accumulator.add(Constants.TIME_PERIOD_1s_3s)
    else if(visitLength <= 6) accumulator.add(Constants.TIME_PERIOD_4s_6s)
    else if(visitLength <= 9) accumulator.add(Constants.TIME_PERIOD_7s_9s)
    else if(visitLength <= 30) accumulator.add(Constants.TIME_PERIOD_10s_30s)
    else if(visitLength <= 60) accumulator.add(Constants.TIME_PERIOD_30s_60s)
    else if(visitLength <= 180) accumulator.add(Constants.TIME_PERIOD_1m_3m)
    else if(visitLength <= 600) accumulator.add(Constants.TIME_PERIOD_3m_10m)
    else if(visitLength <= 1800) accumulator.add(Constants.TIME_PERIOD_10m_30m)
    else accumulator.add(Constants.TIME_PERIOD_30m)

    //对步长进行统计
    if(stepLength <= 3) accumulator.add(Constants.STEP_PERIOD_1_3)
    else if(stepLength <= 6) accumulator.add(Constants.STEP_PERIOD_4_6)
    else if(stepLength <= 9) accumulator.add(Constants.STEP_PERIOD_7_9)
    else if(stepLength <= 30) accumulator.add(Constants.STEP_PERIOD_10_30)
    else if(stepLength <= 60) accumulator.add(Constants.STEP_PERIOD_30_60)
    else accumulator.add(Constants.STEP_PERIOD_60)

    true
  }



  def getActionRDDByDateRange(sparkSession: SparkSession, taskParam: JSONObject): RDD[Row] ={
    //解析时间范围
    val startDate = ParamUtils.getParam(taskParam,Constants.PARAM_START_DATE)
    val endDate = ParamUtils.getParam(taskParam,Constants.PARAM_END_DATE)

    val sql = s"select * from user_visit_action where Date >= '$startDate' and Date <= '$endDate'"
    val actionDF: DataFrame = sparkSession.sql(sql)
    //将 结果转换为 RDD
    actionDF.rdd
  }

  //(userId,userSessionInfo)
  def aggregateByUserId(sparkSession: SparkSession,sessionIdAction: RDD[(String, Row)]): RDD[(String, String)] ={
    //根据sessionId 进行聚合 (sessionId,iteerator(row))
    val sessionGroupBySessionId: RDD[(String, Iterable[Row])] = sessionIdAction.groupByKey()

    val sessionUserInfo: RDD[(Long, String)] = sessionGroupBySessionId.map(tup => {
      val sessionId = tup._1
      val searchKeyWords = new StringBuffer()
      val clickCategoryIds = new StringBuffer()
      var userId = 0L
      var startTime = new Date()
      var endTime = new Date(0L)
      //访问步长
      var stepLength = 0
      //获取迭代器
      val it = tup._2.iterator
      while (it.hasNext) {
        val row = it.next()
        if (userId == 0L) userId = row.getLong(1)
        val searchWord = row.getString(5)
        val clickId: Long = if(row.get(6) != null) row.getLong(6) else -1
        if (StringUtils.isNotEmpty(searchWord) && !searchKeyWords.toString.contains(searchWord))
          searchKeyWords.append(searchWord + ",")
        if (clickId != -1 && !clickCategoryIds.toString.contains(clickId+""))
          clickCategoryIds.append(clickId + ",")
        val actionTime = DateUtils.parseTime(row.getString(4))
        //判断是否需要更新访问时间
        if (actionTime.before(startTime)) startTime = actionTime
        if (actionTime.after(endTime)) endTime = actionTime
        //步长加一
        stepLength += 1
      }
      //trimComma 去除尾部 , 号
      val searchWords = StringUtils.trimComma(searchKeyWords.toString)
      val clickCategorys = StringUtils.trimComma(clickCategoryIds.toString)
      val visitlength = endTime.getTime - startTime.getTime

      //当前用户某一个会话的所有行为信息拼接成一个字符串 field=value|field=value|
      val aggreInfo = Constants.FIELD_SESSION_ID + "=" + sessionId + "|" +
        Constants.FIELD_SEARCH_KEYWORDS + "=" + searchWords + "|" +
        Constants.FIELD_CLICK_CATEGORY_IDS + "=" + clickCategorys + "|" +
        Constants.FIELD_VISIT_LENGTH + "=" + visitlength + "|" +
        Constants.FIELD_STEP_LENGTH + "=" + stepLength + "|" +
        Constants.FIELD_START_TIME + "=" + startTime
      (userId, aggreInfo)
    })

    //获取用户信息
    val sql = "select * from user_info"
    val userInfoRDD: RDD[(Long, Row)] = sparkSession.sql(sql).rdd.map(row => (row.getLong(0),row))
    //(userId,(aggreinfo,row))
    val joinUserInfoRDD: RDD[(Long, (String, Row))] = sessionUserInfo.join(userInfoRDD)
    //解析 row 里面的数据，形成新的用户信息
    val fullAggr: RDD[(String, String)] = joinUserInfoRDD.map(tup => {
      //得到上次拼接的信息
      val lastAggrInfo = tup._2._1
      //用户信息
      val userInfo: Row = tup._2._2
      val age = userInfo.getInt(3)
      val professional = userInfo.getString(4)
      val city = userInfo.getString(5)
      val sex = userInfo.getString(6)

      val fullAggrInfo = lastAggrInfo + "|" +
        Constants.FIELD_AGE + "=" + age + "|" +
        Constants.FIELD_PROFESSIONAL + "=" + professional + "|" +
        Constants.FIELD_CITY + "=" + city + "|" +
        Constants.FIELD_SEX + "=" + sex
      val sessionId = StringUtils.getFieldFromConcatString(lastAggrInfo, "\\|", Constants.FIELD_SESSION_ID)
      (sessionId, fullAggrInfo)
    })
    fullAggr
  }
}
